package com.xl.competition.modul_b.task2

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * @author: xl
 * @createTime: 2023/11/15 16:18:10
 * @program: com.xl.competition
 * @description: ${description}
 */
object LoadOrderInfoToDwd {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getName)
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://node2:9083")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .getOrCreate()

    val timestamp: Long = spark.sql(
      """
        |select `if`(max(unix_timestamp(create_time)) > max(nvl(unix_timestamp(operate_time), 0L)),
        |            max(unix_timestamp(create_time)), max(unix_timestamp(operate_time))
        |           )
        |from ods.user_info
        |""".stripMargin)
      .first()
      .getLong(0)

    val orderInfoDF: DataFrame = spark.sql(
      s"""
        |select id,
        |       consignee,
        |       consignee_tel,
        |       total_amount,
        |       order_status,
        |       user_id,
        |       payment_way,
        |       delivery_address,
        |       order_comment,
        |       out_trade_no,
        |       trade_body,
        |       from_unixtime(unix_timestamp(nvl(create_time, operate_time)),'yyyyMMdd') as  create_time,
        |       from_unixtime(unix_timestamp(nvl(create_time, operate_time)),'yyyyMMdd') as  etl_date,
        |       operate_time,
        |       expire_time,
        |       process_status,
        |       tracking_no,
        |       parent_order_id,
        |       img_url,
        |       province_id,
        |       activity_reduce_amount,
        |       coupon_reduce_amount,
        |       original_total_amount,
        |       feight_fee,
        |       feight_fee_reduce,
        |       refundable_time,
        |       'user1'                            dwd_insert_user,
        |       substr(current_timestamp(), 1, 19) dwd_insert_time,
        |       'user1'                            dwd_modify_user,
        |       substr(current_timestamp(), 1, 19) dwd_modify_time
        |from ods.order_info
        |where unix_timestamp(create_time) > $timestamp or unix_timestamp(operate_time) > $timestamp
        |""".stripMargin)


    orderInfoDF.write
      .partitionBy("etl_date")
      .mode(SaveMode.Append)
      .format("hive")
      .saveAsTable("dwd.fact_order_info")

    spark.stop()
  }
}
